package 函数式编程.使用Stream;
/*
自定义收集器Characteristics使用说明

​在自定义收集器前，我们再确定下 Collector 接口函数接收参数和该实现的方法。

public interface Collector<T, A, R> {
  // A 结果类型，T 中间操作类型，R 最终返回类型，一般情况下，A=R
    Supplier<A> supplier(); // 源数据对象类型（中间操作对象类型）  A get()
    BiConsumer<A, T> accumulator();// T 中间操作类型   void accept(A a,T t)
    BinaryOperator<A> combiner(); //并发 合并部分结果   A apply(A a, A b)
    Function<A, R> finisher();// 可选，对结果集的转换    R apply(A a)
		Set<Characteristics> characteristics(); // 当前收集器的特性
  
    enum Characteristics {
        CONCURRENT,// 标识收集器是一个并发的
        UNORDERED,// 收集操作不能保证保留顺序
        IDENTITY_FINISH // 标识 finisher 就是 identity（同一性） 函数，类型转换要求必须成功。
    }
}

自定义一个set收集器,我们需要分别实现五个方法，事例代码：
*/
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
import static java.util.stream.Collector.Characteristics.UNORDERED;

//public class MySetCollector<T> implements Collector<T, Set<T>, Set<T>> {
public class 自定义收集器Characteristics使用说明<T> implements Collector<T, Set<T>, Set<T>> {

    @Override
    public Supplier supplier() {
        System.out.println("suppliier invoked!");
        return HashSet::new;
    }

    @Override
    public BiConsumer<Set<T>, T> accumulator() {
        System.out.println("accmulator invoked!");

        return Set::add;
//        return HashSet::add;// 不可以写这个，因为类型可能与supplier中的类型冲突
    }

    @Override
    public BinaryOperator<Set<T>> combiner() {
        System.out.println("combiner invoked!");

        return (set1, set2) -> {
            set1.addAll(set2);
            return set1;
        };
    }

  // 声明 IDENTITY_FINISH 后方法不会执行，默认是Java底层自动实现。
    @Override
    public Function<Set<T>,Set<T>> finisher() {
        System.out.println("finisher invoked!");
//        return t -> t;
//        return Function.identity();
      	throw new UnsupportedOperationException();
    }

    @Override
    public Set<Characteristics> characteristics() {
        System.out.println("characteristics invoked");
        return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, UNORDERED));
    }

    public static void main(String[] args) {
        List<String> list = Arrays.asList("hello", "world", "welcome", "hello");
        //Set<String> collect = list.stream().collect(new MySetCollector<>());
        Set<String> collect = list.stream().collect(new 自定义收集器Characteristics使用说明<>());
        System.out.println(collect);
    }
}

/*
java底层执行过程：

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
  // 这里characteristics 会被调用第1次
    if (isParallel()
            && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));// 调用此处方法
    }
  // 这里characteristics 会被调用第二次
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
           ? (R) container
           : collector.finisher().apply(container);
}


public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
    Supplier<I> supplier = Objects.requireNonNull(collector).supplier();//此处调用
    BiConsumer<I, ? super T> accumulator = collector.accumulator();// 调用
    BinaryOperator<I> combiner = collector.combiner();// 调用
    class ReducingSink extends Box<I>
            implements AccumulatingSink<T, I, ReducingSink> {
        @Override
        public void begin(long size) {
            state = supplier.get();
        }

        @Override
        public void accept(T t) {
            accumulator.accept(state, t);
        }

        @Override
        public void combine(ReducingSink other) {
            state = combiner.apply(state, other.state);
        }
    }
    return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    };
}

9.1 Characteristics 使用说明：
IDENTITY_FINISH 表明 finisher 就是 identity 函数，可以省略。如果设置，则必须是从A（中间结果类型）到R（最终结果类型）的未经检查的强制转换成功，不然就会报类型转换错误，一般如果A和R的类型一致，就可以设置，此时设置之后，就不会调用finisher，java自己进行强转

CONCURRENT 表示此收集器是并发的，简单点说，加CONCURRENT ，意味着使用 parallelStream，产生多少个线程了，都只有一个中间容器， accumulator 在执行时，由于中间容器在只有一个的情况下，要求不能有一边查询和一边修改的操作，不然会抛 ConcurrentModificationException 异常，且由于只有一个中间容器，所以不调用 combiner 定义的回调方法的。不加上CONCURRENT ，就是产生的多个线程多个容器，执行combiner合并容器。

ConcurrentModificationException：异常原因, it is not generally permissible for one thread to modify a Collection while another thread is iterating over it. 简单点说，多线程时，并发调同一对象，有的在执行添加，有的在执行查询，所以就会抛出异常。

UNORDERED 指示集合操作不承诺保留输入元素的遭遇顺序。 （如果结果容器没有内在顺序，例如Set，则可能是这样。）

下面例子说明：

package cn.zxhysy.jdk8.stream2;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

public class MySetCollection2<T> implements Collector<T, Set<T>, Map<T, T>> {

    @Override
    public Supplier<Set<T>> supplier() {
        System.out.println("suppliier invoked!");

        return () -> {
            // 查看中间容器有几个
            System.out.println("supplier ------------------");
            return new HashSet<>();
        };
    }

    @Override
    public BiConsumer<Set<T>, T> accumulator() {
        System.out.println("accmulator invoked!");

        return (set, item)->{
            // 开启并发， 打印 set 有几率产生异常 ConcurrentModificationException
            // 异常原因在于，一个在遍历，一个在添加，导致异常
            System.out.println("accumulatro:"+ set + ", " +Thread.currentThread().getName());
            set.add(item);
        };
    }

    @Override
    public BinaryOperator<Set<T>> combiner() {
        System.out.println("combiner invoked!");

        return (set1, set2) ->{
            System.out.println("set1："+ set1);
            System.out.println("set2："+ set2);
            set1.addAll(set2);
            return set1;
        };
    }

    @Override
    public Function<Set<T>, Map<T, T>> finisher() {
        System.out.println("finisher invoked!");

        return set -> {
            Map<T, T> map = new HashMap<>(set.size());
            set.forEach(item -> map.put(item, item));
            return map;
        };
    }

    @Override
    public Set<Characteristics> characteristics() {
        System.out.println("characteristics invoked!");

        // 通过添加 CONCURRENT UNORDERED IDENTITY_FINISH 来看差异
        // Characteristics.CONCURRENT Characteristics.UNORDERED Characteristics.IDENTITY_FINISH
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
    }

    public static void main(String[] args) {
        for(int i = 0; i< 100; i++){
            List<String> list = Arrays.asList("hello", "world", "welcome", "hello", "a", "b", "c", "d", "e", "f", "g");
            Set<String> set = new HashSet<>();
            set.addAll(list);
//        Map<String, String> map = set.parallelStream().collect(new MySetCollection2<>());
            // 并行流，与上面等价
            Map<String, String> map = set.stream().parallel().collect(new MySetCollection2<>());
            // 串行流
//        Map<String, String> map = set.stream().sequential().collect(new MySetCollection2<>());
            System.out.println(map);
        }
    }
}
*/